AWS WAFのフルログをAthenaで分析できるようにしてみた
こんにちは、臼田です。
皆さん、WAFWAFしてますか?(思いつき
ついにAWS WAFでフルログを取得することができるようになりました!すっごいわくわくしてます。
これまでは頑張ってsampleログを取得してゴニョゴニョしていましたが、これからはKinesis Data Firehoseにつっこんでやりたい放題です!
今回はとりあえずそのままS3に吐き出されたログをAthenaで分析できるようにしてみます。
ログをS3に入れるところまでの方法は上記記事を参考にしてください。
テーブル作成
ログのフォーマットはこちらにあります。
CREATE TABLE
は下記のようになります。
2020/01/05追記 XSS/SQLiマッチルール詳細追加によるスキーマ変更を反映しました
2018/12/24追記 例外ルール設定追加によるスキーマ変更を反映しました
CREATE EXTERNAL TABLE IF NOT EXISTS waflogs ( `timestamp` bigint, formatVersion int, webaclId string, terminatingRuleId string, terminatingRuleType string, action string, terminatingRuleMatchDetails array < struct < conditionType: string, location: string, matchedData: array < string > > >, httpSourceName string, httpSourceId string, ruleGroupList array < struct < ruleGroupId: string, terminatingRule: struct < ruleId: string, action: string >, nonTerminatingMatchingRules: array < struct < action: string, ruleId: string > >, excludedRules: array < struct < exclusionType: string, ruleId: string > > > >, rateBasedRuleList array < struct < rateBasedRuleId: string, limitKey: string, maxRateAllowed: int > >, nonTerminatingMatchingRules array < struct < action: string, ruleId: string > >, httpRequest struct < clientIp: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpVersion: string, httpMethod: string, requestId: string > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://[バケット名]/';
いくつかログのフォーマットについて説明します。
timestamp
sampleログをSDKなどで取得していたらあまり気にすることはなかったのですが、フルログではunixtimeで格納されます。ちなみに、1535785722145
のようにミリ秒付きなので、加工に困ります。
terminatingRule
terminatingRuleId
やterminatingRuleType
は実際に最終的にリクエストが処理されたRuleに関するログです。途中で通過したRuleは2箇所のnonTerminatingMatchingRules
に出てきます。どのルールにも引っかからなければId
はDefault_Action
になります。
excludedRules
例外ルール設定を行っている際にそのルールが出力されます。
action
WAFを利用しているとactionとしてはALLOW
, BLOCK
, COUNT
の3つが出てきますが、上記仕様上最終処理まで行ったあとのログのため、全体としてのactionはCOUNT
になりません。nonTerminatingMatchingRules
のactionのみCOUNT
となります。
terminatingRuleMatchDetails
XSS/SQLiルールでBLOCKした場合に詳細情報が出力されます。conditionType
はXSSかSQL_INJECTIONが入ります。location
はヘッダやBodyなどマッチした箇所が入ります。matchedData
はマッチした値が入ります。
httpSource
CloudFrontのwebACLか、ALBのwebACLかがわかります。
ruleGroupList
AWS WAFのRuleにはRule・RuleGroup・RateBasedがあります。RuleGroupはマネージドルールやFirewall Managerのルールで、通常WAF画面で設定するものがRuleです。普段は意識することはないですが、API等では別れているので注意が必要です。ログでも別れています。
uri・args
sampleログの時はURLにまとめてQueryStringが格納されていましたが、フルログでは分かれるようになっています。使いやすくなった感じですね。
よく使うクエリ
とりあえず一覧
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs;
単純に*
するとtimestampがunixtimeでしか表示されず見づらいので、先頭にJSTの時間をつけてみました。
コツはミリ秒があるunixtimeなので、1000で割ってあげるのと(ミリ秒切り捨てになるけど見る分にはそこまで気にしない)from_unixtime
でtimestamp型にすると共にJSTに変換します。
timestampによる絞り込み
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs WHERE date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) = date '2018-09-02' ORDER BY timestamp;
こちらも先ほどと同じで時間を条件にするためにJSTに変換してから比較しています。ORDER BY
は特に気にしなくていいのでそのままtimestampを使っています。
BLOCKのログ
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs WHERE action = 'BLOCK';
COUNTのログ
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs, UNNEST(nonTerminatingMatchingRules) t(nonTermRule) WHERE nonTermRule.action = 'COUNT';
通常RuleのCOUNTを取り出すクエリ。nonTerminatingMatchingRules
にUNNESTを使って頑張っています。COUNTが出るところはruleGroupListの中にもあるけど、こちらのCOUNTもやろうとするとすごく頑張らないといけないので割愛。
日本から来ていてBLOCKされたログ
海外ならいざ知らず、日本から来ていたら誤検知かもしれないので確認したくなります。
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs WHERE httpRequest.country = 'JP' AND action = 'BLOCK';
国毎のログ数の集計
JP以外が多いと、気になりますね。明らかに必要なければGeoIPのルールを手前に入れて止めることも検討します。
SELECT httpRequest.country, COUNT(httpRequest.country) AS count FROM waflogs GROUP BY httpRequest.country ORDER BY count DESC;
まとめ
AWS WAFのフルログをAthenaで分析できるようにしてみました。ついでにいくつか参考クエリを用意してみました。
これまではログの一部しか取得できていませんでしたが、すべてのログが取れることで分析上で正確な情報を出すことができるので、バンバン使っていきたいですね!